package com.deep.cdc;

import com.deep.bean.ConnectConfig;
import com.deep.bean.TransformConfig;
import com.deep.common.ArgsConfig;
import com.deep.common.ArgsUtil;
import com.deep.common.ProfileConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 要删除触发器
 */
public class SyncUmRec {
    public static void main(String[] args) {
        ArgsConfig argsConfig = ArgsUtil.getMap(args);
        Integer recid = argsConfig.getRecid();

        TransformConfig transformConfig = ProfileConfig.getTransformConfig(argsConfig.getProfile());
        ConnectConfig sourceConfig = transformConfig.getSource();
        ConnectConfig targetConfig = transformConfig.getTarget();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //创建用于输出的表
        String createInputDDL = "create table source_t_um_rec " +
                "(" +
                "RECID  int , " +
                "UNIQUEID  string , " +
                "TASKNUM  string , " +
                "RECSTATEID  int , " +
                "RECSTATENAME  string , " +
                "RECSRCID  int , " +
                "RECSRCNAME  string , " +
                "RECTYPEID  int , " +
                "RECTYPENAME  string , " +
                "MAINTYPEID  int , " +
                "MAINTYPENAME  string , " +
                "SUBTYPEID  int , " +
                "SUBTYPENAME  string , " +
                "RECLEVELID  int , " +
                "RECLEVELNAME  string , " +
                "RECGRADEID  int , " +
                "RECGRADENAME  string , " +
                "RECDESC  string , " +
                "ADDRESS  string , " +
                "PARTCODE  string , " +
                "DISTRICTCODE  string , " +
                "DISTRICTNAME  string , " +
                "STREETCODE  string , " +
                "STREETNAME  string , " +
                "COMMUNITYCODE  string , " +
                "COMMUNITYNAME  string , " +
                "CELLCODE  string , " +
                "CELLNAME  string , " +
                "DUTYGRIDID  int , " +
                "DUTYGRIDNAME  string , " +
                "COORDX  double , " +
                "COORDY  double , " +
                "PATROLID  int , " +
                "PATROLNAME  string , " +
                "VERIFYMSGSTATEID  int , " +
                "CHECKMSGSTATEID  int , " +
                "CREATETIME  timestamp(0) , " +
                "PROCSTARTTIME  timestamp(0) , " +
                "ARCHIVETIME  timestamp(0) , " +
                "ENDTIME  timestamp(0) , " +
                "DEADLINE  timestamp(0) , " +
                "WARNINGTIME  timestamp(0) , " +
                "LIMITINFO  string , " +
                "TIMEUNIT  int , " +
                "RECLIMIT  double , " +
                "WARNINGLIMIT  double , " +
                "USEDTIME  double , " +
                "REMAINTIME  double , " +
                "USEDTIMECHAR  string , " +
                "REMAINTIMECHAR  string , " +
                "LASTPOSTPONEDDAYS  int , " +
                "POSTPONEDDAYS  int , " +
                "DISPLAYSTYLE  int , " +
                "OCCURTIME  timestamp(0) , " +
                "ACCEPTTIME  timestamp(0) , " +
                "ESTABLISHTIME  timestamp(0) , " +
                "DISPATCHTIME  timestamp(0) , " +
                "FUNCDEALTIME  timestamp(0) , " +
                "MAPEXTENT  string , " +
                "READFLAG  int , " +
                "MEDIAUPLOADTOTALNUM  int , " +
                "MEDIAUPLOADNUM  int , " +
                "MEDIAVERIFYTOTALNUM  int , " +
                "MEDIAVERIFYNUM  int , " +
                "MEDIACHECKTOTALNUM  int , " +
                "MEDIACHECKNUM  int , " +
                "REPORTPICTOTALNUM  int , " +
                "REPORTWAVTOTALNUM  int , " +
                "REPORTVIDEOTOTALNUM  int , " +
                "REPORTPICNUM  int , " +
                "REPORTWAVNUM  int , " +
                "REPORTVIDEONUM  int , " +
                "VERIFYPICTOTALNUM  int , " +
                "VERIFYWAVTOTALNUM  int , " +
                "VERIFYVIDEOTOTALNUM  int , " +
                "VERIFYPICNUM  int , " +
                "VERIFYWAVNUM  int , " +
                "VERIFYVIDEONUM  int , " +
                "CHECKPICTOTALNUM  int , " +
                "CHECKWAVTOTALNUM  int , " +
                "CHECKVIDEOTOTALNUM  int , " +
                "CHECKPICNUM  int , " +
                "CHECKWAVNUM  int , " +
                "CHECKVIDEONUM  int , " +
                "ESTABLISHCONDID  int , " +
                "ESTABLISHCONDNAME  string , " +
                "GATHERFLAG  int , " +
                "DISPLAYSTYLEID  int , " +
                "STORAGEID  int , " +
                "HASQSREC  int , " +
                "PATROLDEALFLAG  int , " +
                "ACTPROPERTYID  int , " +
                "LASTUPDATETIME  timestamp(0) , " +
                "ACTPROPERTYNAME  string , " +
                "URGENTFLAG  int , " +
                "IMAGEX  double , " +
                "IMAGEY  double , " +
                "IMAGEZ  double , " +
                "IMAGEID  string , " +
                "LEADERID  int , " +
                "evaluateOrderScore  int , " +
                "evaluateOrderInfo  string , " +
                "evaluateOrderUserId  int , " +
                "evaluateOrderUserName  string , " +
                "evaluateScore  int , " +
                "evaluateInfo  string , " +
                "evaluateUserId  int , " +
                "reporterName  string , " +
                "mobile  string , " +
                "gender  string , " +
                "questionType  string , " +
                "area  string , " +
                "workOrderNo  string , " +
                "showTel  string , " +
                "houseCode  string , " +
                "leaderSupersiveStateID  int , " +
                "aacceptorNo  string , " +
                "showFlag  int , " +
                "auditStatus  int , " +
                "typeFlag  string , " +
                "auditBackTime  timestamp(0) , " +
                "auditPassTime  timestamp(0) , " +
                "auditTime  timestamp(0) , " +
                "auditUserId  int , " +
                "auditUserName  string , " +
                "evaluateOrderTime  timestamp(0) , " +
                "createUserId  int , " +
                "createUserName  string , " +
                "pubEvaluateScore  int , " +
                "pubEvaluateUserId  bigint , " +
                "pubEvaluateInfo  string , " +
                "pubEvaluateUserName  string , " +
                "iscomponent  int , " +
                "compmaintypeid  bigint , " +
                "compmaintypename  string , " +
                "compsubtypeid  bigint , " +
                "compsubtypename  string , " +
                "compid  bigint , " +
                "compname  string , " +
                "pickLocation  string , " +
                "appealType  int , " +
                "    PRIMARY KEY (`RECID`) NOT ENFORCED" +
                ")WITH (" +
                "    'connector' = 'mysql-cdc'," +
                "    'hostname' = '" + sourceConfig.getHostname() + "',  " +
                "    'port' = '" + sourceConfig.getPort() + "',   " +
                "    'username' = '" + sourceConfig.getUsername() + "',  " +
                "    'password' = '" + sourceConfig.getPassword() + "',   " +
                "    'database-name' = '" + sourceConfig.getDatabaseName() + "',  " +
                "    'table-name' = 't_um_rec'," +
                "   'debezium.skipped.operations'='d'" +
                ") ";
        tableEnv.executeSql(createInputDDL);


        //创建用于输出的表
        String createOutputDDL = "create table target_t_um_rec " +
                "(" +
                "RECID  int , " +
                "UNIQUEID  string , " +
                "TASKNUM  string , " +
                "RECSTATEID  int , " +
                "RECSTATENAME  string , " +
                "RECSRCID  int , " +
                "RECSRCNAME  string , " +
                "RECTYPEID  int , " +
                "RECTYPENAME  string , " +
                "MAINTYPEID  int , " +
                "MAINTYPENAME  string , " +
                "SUBTYPEID  int , " +
                "SUBTYPENAME  string , " +
                "RECLEVELID  int , " +
                "RECLEVELNAME  string , " +
                "RECGRADEID  int , " +
                "RECGRADENAME  string , " +
                "RECDESC  string , " +
                "ADDRESS  string , " +
                "PARTCODE  string , " +
                "DISTRICTCODE  string , " +
                "DISTRICTNAME  string , " +
                "STREETCODE  string , " +
                "STREETNAME  string , " +
                "COMMUNITYCODE  string , " +
                "COMMUNITYNAME  string , " +
                "CELLCODE  string , " +
                "CELLNAME  string , " +
                "DUTYGRIDID  int , " +
                "DUTYGRIDNAME  string , " +
                "COORDX  double , " +
                "COORDY  double , " +
                "PATROLID  int , " +
                "PATROLNAME  string , " +
                "VERIFYMSGSTATEID  int , " +
                "CHECKMSGSTATEID  int , " +
                "CREATETIME  timestamp(0) , " +
                "PROCSTARTTIME  timestamp(0) , " +
                "ARCHIVETIME  timestamp(0) , " +
                "ENDTIME  timestamp(0) , " +
                "DEADLINE  timestamp(0) , " +
                "WARNINGTIME  timestamp(0) , " +
                "LIMITINFO  string , " +
                "TIMEUNIT  int , " +
                "RECLIMIT  double , " +
                "WARNINGLIMIT  double , " +
                "USEDTIME  double , " +
                "REMAINTIME  double , " +
                "USEDTIMECHAR  string , " +
                "REMAINTIMECHAR  string , " +
                "LASTPOSTPONEDDAYS  int , " +
                "POSTPONEDDAYS  int , " +
                "DISPLAYSTYLE  int , " +
                "OCCURTIME  timestamp(0) , " +
                "ACCEPTTIME  timestamp(0) , " +
                "ESTABLISHTIME  timestamp(0) , " +
                "DISPATCHTIME  timestamp(0) , " +
                "FUNCDEALTIME  timestamp(0) , " +
                "MAPEXTENT  string , " +
                "READFLAG  int , " +
                "MEDIAUPLOADTOTALNUM  int , " +
                "MEDIAUPLOADNUM  int , " +
                "MEDIAVERIFYTOTALNUM  int , " +
                "MEDIAVERIFYNUM  int , " +
                "MEDIACHECKTOTALNUM  int , " +
                "MEDIACHECKNUM  int , " +
                "REPORTPICTOTALNUM  int , " +
                "REPORTWAVTOTALNUM  int , " +
                "REPORTVIDEOTOTALNUM  int , " +
                "REPORTPICNUM  int , " +
                "REPORTWAVNUM  int , " +
                "REPORTVIDEONUM  int , " +
                "VERIFYPICTOTALNUM  int , " +
                "VERIFYWAVTOTALNUM  int , " +
                "VERIFYVIDEOTOTALNUM  int , " +
                "VERIFYPICNUM  int , " +
                "VERIFYWAVNUM  int , " +
                "VERIFYVIDEONUM  int , " +
                "CHECKPICTOTALNUM  int , " +
                "CHECKWAVTOTALNUM  int , " +
                "CHECKVIDEOTOTALNUM  int , " +
                "CHECKPICNUM  int , " +
                "CHECKWAVNUM  int , " +
                "CHECKVIDEONUM  int , " +
                "ESTABLISHCONDID  int , " +
                "ESTABLISHCONDNAME  string , " +
                "GATHERFLAG  int , " +
                "DISPLAYSTYLEID  int , " +
                "STORAGEID  int , " +
                "HASQSREC  int , " +
                "PATROLDEALFLAG  int , " +
                "ACTPROPERTYID  int , " +
                "LASTUPDATETIME  timestamp(0) , " +
                "ACTPROPERTYNAME  string , " +
                "URGENTFLAG  int , " +
                "IMAGEX  double , " +
                "IMAGEY  double , " +
                "IMAGEZ  double , " +
                "IMAGEID  string , " +
                "LEADERID  int , " +
                "evaluateOrderScore  int , " +
                "evaluateOrderInfo  string , " +
                "evaluateOrderUserId  int , " +
                "evaluateOrderUserName  string , " +
                "evaluateScore  int , " +
                "evaluateInfo  string , " +
                "evaluateUserId  int , " +
                "reporterName  string , " +
                "mobile  string , " +
                "gender  string , " +
                "questionType  string , " +
                "area  string , " +
                "workOrderNo  string , " +
                "showTel  string , " +
                "houseCode  string , " +
                "leaderSupersiveStateID  int , " +
                "aacceptorNo  string , " +
                "showFlag  int , " +
                "auditStatus  int , " +
                "typeFlag  string , " +
                "auditBackTime  timestamp(0) , " +
                "auditPassTime  timestamp(0) , " +
                "auditTime  timestamp(0) , " +
                "auditUserId  int , " +
                "auditUserName  string , " +
                "evaluateOrderTime  timestamp(0) , " +
                "createUserId  int , " +
                "createUserName  string , " +
                "pubEvaluateScore  int , " +
                "pubEvaluateUserId  bigint , " +
                "pubEvaluateInfo  string , " +
                "pubEvaluateUserName  string , " +
                "iscomponent  int , " +
                "compmaintypeid  bigint , " +
                "compmaintypename  string , " +
                "compsubtypeid  bigint , " +
                "compsubtypename  string , " +
                "compid  bigint , " +
                "compname  string , " +
                "pickLocation  string , " +
                "appealType  int , " +
                "    PRIMARY KEY (`RECID`) NOT ENFORCED" +
                ")WITH (" +
                " 'connector' = 'jdbc', " +
                " 'driver' = 'com.mysql.cj.jdbc.Driver', " +
                " 'url' = 'jdbc:mysql://"+ targetConfig.getHostname() + ":" + targetConfig.getPort() + "/" + targetConfig.getDatabaseName() + "?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC',  " +
                " 'username' = '" + targetConfig.getUsername() + "',  " +
                "  'password' = '" + targetConfig.getPassword() + "',   " +
                "  'table-name' = 't_um_rec', " +
                "  'connection.max-retry-timeout' = '60s' " +
                ")";
        tableEnv.executeSql(createOutputDDL);


        TableResult tableResult = tableEnv.executeSql("insert into target_t_um_rec select * from source_t_um_rec where RECID >  " + recid);

        tableResult.print();

    }
}
